Running pipelines

The pipelines are run on the server periodically and based on pipeline and data dependencies.

You can also run specific pipelines manually for development or to run custom pipelines.

Change directory to project root

The Jupyter notebooks run in the jupyter-notebooks directory. To run pipelines you need to change directory to the parent directory


In [1]:
import os

os.chdir('..')
os.getcwd()


Out[1]:
'/home/ori/knesset-data-pipelines'

List the available pipelines


In [25]:
!{'dpp'}


Available Pipelines:
- ./knesset/kns_knessetdates 
- ./knesset/kns_govministry (*)
- ./knesset/kns_itemtype (*)
- ./knesset/kns_status (*)
- ./members/kns_person 
- ./members/kns_position 
- ./members/kns_persontoposition 
- ./members/kns_mksitecode 
- ./members/mk_individual 
- ./members/presence (*)
- ./laws/kns_law (*)
- ./laws/kns_law_binding (*)
- ./laws/kns_document_law (*)
- ./laws/kns_israel_law (*)
- ./laws/kns_israel_law_name (*)
- ./laws/kns_israel_law_ministry (*)
- ./laws/kns_israel_law_classification (*)
- ./laws/kns_israel_law_binding (*)
- ./votes/view_vote_mk_individual (*)
- ./votes/vote_result_type (*)
- ./votes/vote_rslts_kmmbr_shadow (E)
	Dirty dependency: Cannot run until dependency is executed: ./votes/vote_result_type
- ./votes_kmember/join_votes_shadow_mk (*)
- ./plenum/kns_plenumsession (*)
- ./plenum/kns_plmsessionitem (*)
- ./plenum/kns_documentplenumsession (*)
- ./bills/kns_bill (*)
- ./bills/kns_billname (*)
- ./bills/kns_billinitiator (*)
- ./bills/kns_billhistoryinitiator (*)
- ./bills/kns_billsplit 
- ./bills/kns_billunion (*)
- ./bills/kns_documentbill (*)
- ./committees/kns_committee 
- ./committees/kns_jointcommittee (*)
- ./committees/kns_cmtsitecode (*)
- ./committees/kns_cmtsessionitem (*)
- ./committees/kns_documentcommitteesession 
- ./committees/sync-documents (*)
- ./committees/kns_committeesession (E)
	Dirty dependency: Cannot run until dependency is executed: ./committees/kns_cmtsessionitem
- ./committees/dist/build_positions 
- ./committees/dist/create_factions 
- ./lobbyists/v_lobbyist (*)
- ./lobbyists/v_lobbyist_clients (*)
- ./people/source-data-stats (*)
- ./people/committee-meeting-attendees 
- ./people/committee-meeting-attendees-mks-stats (*)
- ./people/committee-meeting-attendees-mks-full-stats 
- ./people/attendance/committee-meetings 
- ./people/attendance/committee-meetings-export 
- ./votes/view_vote_rslts_hdr_approved 
- ./committees/background_material_titles 
- ./committees/dist/build (*)
- ./committees/dist/render_meetings (*)(E)
	Dirty dependency: Cannot run until dependency is executed: ./committees/dist/build
- ./committees/dist/render_committees (*)(E)
	Dirty dependency: Cannot run until dependency is executed: ./committees/dist/render_meetings
	Dirty dependency: Cannot run until dependency is executed: ./committees/dist/build
- ./committees/dist/create_members 
- ./committees/dist/sync-dist (*)(E)
	Dirty dependency: Cannot run until dependency is executed: ./committees/dist/render_meetings
	Dirty dependency: Cannot run until dependency is executed: ./committees/dist/render_committees
- ./web_ui/meetings 
- ./people/plenum-session-voters (*)(E)
	Dirty dependency: Cannot run until dependency is executed: ./plenum/kns_plenumsession
- ./people/plenum-session-voters-stats (E)
	Dirty dependency: Cannot run until dependency is executed: ./plenum/kns_plenumsession
	Dirty dependency: Cannot run until dependency is executed: ./people/plenum-session-voters
- ./people/mk-voted-against-majority 
- ./people/mk-party-discipline-stats (*)
- ./people/mk-party-discipline-reports 
- ./people/mk-party-discipline-reports-export 
- ./people/attendance/plenum-votes (*)(E)
	Dirty dependency: Cannot run until dependency is executed: ./people/plenum-session-voters

Run a pipeline

The following runs the ./committees/kns_committee pipeline which downloads committees from the Knesset API


In [3]:
!{'dpp run --verbose ./committees/kns_committee'}


[./committees/kns_committee:T_0] >>> INFO    :b7832488 RUNNING ./committees/kns_committee
[./committees/kns_committee:T_0] >>> INFO    :b7832488 Collecting dependencies
[./committees/kns_committee:T_0] >>> INFO    :b7832488 Running async task
[./committees/kns_committee:T_0] >>> INFO    :b7832488 Waiting for completion
[./committees/kns_committee:T_0] >>> INFO    :b7832488 Async task starting
[./committees/kns_committee:T_0] >>> INFO    :b7832488 Searching for existing caches
[./committees/kns_committee:T_0] >>> INFO    :b7832488 Building process chain:
[./committees/kns_committee:T_0] >>> INFO    :- ..datapackage_pipelines_knesset.dataservice.processors.add_dataservice_collection_resource
[./committees/kns_committee:T_0] >>> INFO    :- ..datapackage_pipelines_knesset.common.processors.throttle
[./committees/kns_committee:T_0] >>> INFO    :- knesset.dump_to_path
[./committees/kns_committee:T_0] >>> INFO    :- knesset.dump_to_sql
[./committees/kns_committee:T_0] >>> INFO    :- (sink)
[./committees/kns_committee:T_0] >>> INFO    :..datapackage_pipelines_knesset.dataservice.processors.add_dataservice_collection_resource: INFO    :Loading dataservice resource from service api method KNS_Committee
[./committees/kns_committee:T_0] >>> INFO    :..datapackage_pipelines_knesset.common.processors.throttle: INFO    :throttling resource kns_committee: sleep_seconds=0.01
[./committees/kns_committee:T_0] >>> INFO    :..datapackage_pipelines_knesset.dataservice.processors.add_dataservice_collection_resource: INFO    :Processed 722 rows
[./committees/kns_committee:T_0] >>> INFO    :..datapackage_pipelines_knesset.common.processors.throttle: INFO    :Processed 722 rows
[./committees/kns_committee:T_0] >>> INFO    :knesset.dump_to_path: INFO    :Processed 722 rows
[./committees/kns_committee:T_0] >>> INFO    :knesset.dump_to_sql: INFO    :Processed 722 rows
[./committees/kns_committee:T_0] >>> INFO    :b7832488 DONE /home/ori/knesset-data-pipelines/committees/../datapackage_pipelines_knesset/common/processors/throttle.py
[./committees/kns_committee:T_0] >>> INFO    :b7832488 DONE /home/ori/virtualenvs/knesset-data-pipelines-fhQl9XOq/lib/python3.6/site-packages/datapackage_pipelines/manager/../lib/internal/sink.py
[./committees/kns_committee:T_0] >>> INFO    :b7832488 DONE /home/ori/knesset-data-pipelines/datapackage_pipelines_knesset/processors/dump_to_sql.py
[./committees/kns_committee:T_0] >>> INFO    :b7832488 DONE /home/ori/knesset-data-pipelines/datapackage_pipelines_knesset/processors/dump_to_path.py
[./committees/kns_committee:T_0] >>> INFO    :b7832488 DONE /home/ori/knesset-data-pipelines/committees/../datapackage_pipelines_knesset/dataservice/processors/add_dataservice_collection_resource.py
[./committees/kns_committee:T_0] >>> INFO    :b7832488 DONE V ./committees/kns_committee {'.dpp': {'out-datapackage-url': '../data/committees/kns_committee/datapackage.json'}, 'bytes': None, 'count_of_rows': 722, 'dataset_name': '_', 'hash': '0b97bbeb94b60d47b8201d3424cc95b3'}
INFO    :RESULTS:
INFO    :SUCCESS: ./committees/kns_committee {'bytes': None, 'count_of_rows': 722, 'dataset_name': '_', 'hash': '0b97bbeb94b60d47b8201d3424cc95b3'}

Inspect the output datapackage descriptor

Pipelines use datapackages as the primary input and output data.

Pipeline and datapackage names usually match, so the output of the ./committees/kns_committee pipeline is available at local directory ./data/committees/kns_committee/datapackage.json


In [16]:
KNS_COMMITTEE_DATAPACKAGE_PATH = './data/committees/kns_committee/datapackage.json'

Each package may contain multiple resources, let's see which resource names are available for the kns_committee package


In [17]:
from datapackage import Package

kns_committee_package = Package(KNS_COMMITTEE_DATAPACKAGE_PATH)
kns_committee_package.resource_names


Out[17]:
['kns_committee']

In [18]:
KNS_COMMITTEE_RESOURE_NAME = 'kns_committee'

Inspect the kns_committee resource descriptor which includes metadata and field descriptions


In [19]:
import yaml

print(yaml.dump(package.get_resource(KNS_COMMITTEE_RESOURE_NAME).descriptor, 
                allow_unicode=True, default_flow_style=False))


bytes: 171534
count_of_rows: 722
dialect:
  delimiter: ','
  doubleQuote: true
  lineTerminator: "\r\n"
  quoteChar: '"'
  skipInitialSpace: false
encoding: utf-8
format: csv
hash: 8cd3233b4ccb641bf5d0b9d2eee70916
name: kns_committee
path: kns_committee.csv
profile: data-resource
schema:
  fields:
  - description: קוד הוועדה
    name: CommitteeID
    type: integer
  - description: שם הוועדה
    name: Name
    type: string
  - description: קוד הקטגוריה של הוועדה
    name: CategoryID
    type: integer
  - description: 'תיאור הקטגוריה של הוועדה בכל כנסת, כל הוועדות מוקמות מחדש. השדה
      קטגוריה כולל את רשימת הקטגוריות הנושאיות שאליהן משויכות הוועדות. למשל הקטגוריה
      של ועדת הפנים והגנת הסביבה היא "פנים" וכך היה גם כאשר שם הוועדה היה ועדת הפנים
      ואיכות הסביבה. גם ועדות המשנה של כל ועדה משויכות לקטגוריה שלה. מדובר בשיוך נושאי
      של הוועדות.

      '
    name: CategoryDesc
    type: string
  - description: מספר הכנסת
    name: KnessetNum
    type: integer
  - description: קוד סוג הוועדה
    name: CommitteeTypeID
    type: integer
  - description: 'תיאור סוג הוועדה (ראשית, מיוחדת, משנה, משותפת, הכנסת)

      '
    name: CommitteeTypeDesc
    type: string
  - description: כתובת הדוא"ל של הוועדה
    name: Email
    type: string
  - description: תאריך התחלה
    format: '%Y-%m-%d %H:%M:%S'
    name: StartDate
    type: datetime
  - description: תאריך סיום
    format: '%Y-%m-%d %H:%M:%S'
    name: FinishDate
    type: datetime
  - description: קוד סוג משנה של הוועדה
    name: AdditionalTypeID
    type: integer
  - description: תיאור סוג משנה של הוועדה (קבועה, מיוחדת, חקירה)
    name: AdditionalTypeDesc
    type: string
  - description: קוד ועדת האם (רלוונטי רק לוועדת משנה)
    name: ParentCommitteeID
    type: integer
  - description: תיאור ועדת האם
    name: CommitteeParentName
    type: string
  - description: האם הוועדה פעילה?
    name: IsCurrent
    type: boolean
  - description: תאריך עדכון אחרון
    format: '%Y-%m-%d %H:%M:%S'
    name: LastUpdatedDate
    type: datetime

Print the first 5 row of data


In [23]:
for i, row in enumerate(package.get_resource(KNS_COMMITTEE_RESOURE_NAME).iter(keyed=True), 1):
    if i > 5: continue
    print(f'-- row {i} --')
    print(yaml.dump(row, allow_unicode=True, default_flow_style=False))


-- row 1 --
AdditionalTypeDesc: קבועה
AdditionalTypeID: 991
CategoryDesc: ועדת הכנסת
CategoryID: 1
CommitteeID: 1
CommitteeParentName: null
CommitteeTypeDesc: ועדת הכנסת
CommitteeTypeID: 70
Email: vadatk@knesset.gov.il
FinishDate: null
IsCurrent: true
KnessetNum: 15
LastUpdatedDate: 2017-04-24 16:47:06
Name: הכנסת
ParentCommitteeID: null
StartDate: 1999-06-07 00:00:00

-- row 2 --
AdditionalTypeDesc: קבועה
AdditionalTypeID: 991
CategoryDesc: ועדת הכספים
CategoryID: 2
CommitteeID: 2
CommitteeParentName: null
CommitteeTypeDesc: ועדה ראשית
CommitteeTypeID: 71
Email: null
FinishDate: null
IsCurrent: true
KnessetNum: 15
LastUpdatedDate: 2015-03-20 12:02:57
Name: הכספים
ParentCommitteeID: null
StartDate: 1999-06-07 00:00:00

-- row 3 --
AdditionalTypeDesc: קבועה
AdditionalTypeID: 991
CategoryDesc: ועדת החוץ והביטחון
CategoryID: 4
CommitteeID: 3
CommitteeParentName: null
CommitteeTypeDesc: ועדה ראשית
CommitteeTypeID: 71
Email: null
FinishDate: null
IsCurrent: true
KnessetNum: 15
LastUpdatedDate: 2015-03-20 12:02:57
Name: החוץ והבטחון
ParentCommitteeID: null
StartDate: 1999-06-07 00:00:00

-- row 4 --
AdditionalTypeDesc: קבועה
AdditionalTypeID: 991
CategoryDesc: ועדת הכלכלה
CategoryID: 3
CommitteeID: 4
CommitteeParentName: null
CommitteeTypeDesc: ועדה ראשית
CommitteeTypeID: 71
Email: null
FinishDate: null
IsCurrent: true
KnessetNum: 15
LastUpdatedDate: 2015-03-20 12:02:57
Name: הכלכלה
ParentCommitteeID: null
StartDate: 1999-06-07 00:00:00

-- row 5 --
AdditionalTypeDesc: קבועה
AdditionalTypeID: 991
CategoryDesc: ועדת הפנים והגנת הסביבה
CategoryID: 5
CommitteeID: 5
CommitteeParentName: null
CommitteeTypeDesc: ועדה ראשית
CommitteeTypeID: 71
Email: null
FinishDate: null
IsCurrent: true
KnessetNum: 15
LastUpdatedDate: 2015-03-20 12:02:57
Name: הפנים ואיכות הסביבה
ParentCommitteeID: null
StartDate: 1999-06-07 00:00:00


In [ ]: